package com.cn.lcc.schedue.task;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cn.lcc.util.config.PropertiesLoader;


/**
 * @author lcc  
 * @date 2017年6月12日
 * @备注   
 */
public class TaskUtil {

    private static Logger logger = LoggerFactory.getLogger(TaskUtil.class);

    public static PropertiesConfiguration task = PropertiesLoader.getConfiguration("task");
    public static PropertiesConfiguration dbConfig = PropertiesLoader.getConfiguration("database");

    private static Map<Integer, TaskPool> pools = new ConcurrentHashMap<Integer, TaskPool>();

    public static TaskPool getPool(TaskType taskType) {
        TaskPool taskPool = null;
        synchronized (TaskUtil.class) {
            Integer key = taskType.getTypeId();
            if (pools.containsKey(key)) {
                taskPool = pools.get(key);
            } else {
                logger.info("TaskPool setup, typeId={}, typeName={}",
                        taskType.getTypeId(), taskType.getTypeName());
                taskPool = new TaskPool();
                taskPool.setTaskQueue(new ArrayBlockingQueue<Runnable>(task
                        .getInt("task.queue.max", 500)));
                taskPool.setPool(new ThreadPoolExecutor(task.getInt(
                        "task.thread.pool.size.min", 20), task.getInt(
                        "task.thread.pool.size.max", 50), task.getInt(
                        "task.thread.pool.keepAliveTimeSeconds", 10),
                        TimeUnit.SECONDS, taskPool.getTaskQueue(),
                        new ThreadPoolExecutor.AbortPolicy()));
                pools.put(key, taskPool);
                logger.info(
                        "TaskPool setup successfully, typeId={}, typeName={}",
                        taskType.getTypeId(), taskType.getTypeName());
            }
        }
        return taskPool;
    }

    public static boolean isPoolFull(TaskType taskType) {
        return (getPool(taskType).getTaskQueue().size() == task.getInt(
                "task.queue.max", 500));
    }

    public static int getPoolRecordFetchSize(TaskType taskType) {
        return dbConfig
                .getInt("db.fetch.size.job." + taskType.getTypeId(), 100);
    }

    public static void execute(TaskType taskType, Task task) {
        TaskPool taskPool = getPool(taskType);
        if (taskPool == null) {
            logger.error("不存在工作队列, typeId={}, typeName={}",
                    taskType.getTypeId(), taskType.getTypeName());
        }
        // 当队列满了，而且正在运行的线程数量大于或等于maximumPoolSize时，会抛出RejectedExecutionException
        // 此时需要任务还原
        try {
            taskPool.getPool().execute(task);
        } catch (Exception e) {
            task.recover();
        }
    }

    public static void closeAllPool() {
        logger.info("关闭任务线程池中...");
        for (TaskPool pool : pools.values()) {
            pool.getPool().shutdown();
        }
        logger.info("关闭任务线程池完毕.");
    }
}
